package org.lionel.task;

import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

public class PendingJobPool {

    /**
     * 框架运行时的线程数，与机器的CPU数相同
     */
    private static final int THREAD_COUNTS = Runtime.getRuntime().availableProcessors();
    /**
     * 用以存放待处理的任务，供线程池使用
     */
    private static BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(5000);
    /**
     * 线程池，固定大小，有界队列
     */
    private static ExecutorService taskExecutor
            = new ThreadPoolExecutor(THREAD_COUNTS, THREAD_COUNTS, 60, TimeUnit.SECONDS, taskQueue);
    /**
     * 工作信息的存放容器
     */
    private static ConcurrentHashMap<String, JobInfo<?>> jobInfoMap = new ConcurrentHashMap<>();

    public static Map<String, JobInfo<?>> getMap() {
        return jobInfoMap;
    }

    private PendingJobPool() {

    }

    /**
     * 单例模式
     */
    private static PendingJobPool threadPool = new PendingJobPool();

    public static PendingJobPool getInstance() {
        return threadPool;
    }

    private static class PendingTask<T, R> implements Runnable {

        private JobInfo<R> jobInfo;
        // 任务参数
        private T processData;

        public PendingTask(JobInfo<R> jobInfo, T processData) {
            this.jobInfo = jobInfo;
            this.processData = processData;
        }

        @Override
        public void run() {
            R r = null;
            ITaskProcesser<T, R> taskProcesser = (ITaskProcesser<T, R>) jobInfo.getTaskProcesser();
            TaskResult<R> result = null;
            try {
                result = taskProcesser.taskExecute(processData);
                if (result == null) {
                    result = new TaskResult<R>(TaskResultType.EXCEPTION, r, "结果为空");
                }
                if (result.getTaskResultType() == null) {
                    if (result.getReason() == null) {
                        result = new TaskResult<R>(TaskResultType.EXCEPTION, r, "结果为空");
                    } else {
                        result = new TaskResult<R>(TaskResultType.EXCEPTION, r, "结果为空, 原因：" + result.getReason());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                result = new TaskResult<R>(TaskResultType.EXCEPTION, r, e.getMessage());
            } finally {
                // 江结果添加到延时队列及结果队列，同时计数
                jobInfo.addTaskResult(result);
            }
        }
    }

    /**
     * 调用者提交工作中的任务
     */
    public <T, R> void putTask(String jobName, T t) {
        JobInfo<R> jobInfo = getJob(jobName);
        PendingTask<T, R> task = new PendingTask<>(jobInfo, t);
        taskExecutor.execute(task);
    }

    /**
     * 根据工作名称检索工作
     */
    private <R> JobInfo<R> getJob(String jobName) {
        JobInfo<R> jobInfo = (JobInfo<R>) jobInfoMap.get(jobName);
        if (null == jobInfo) {
            throw new RuntimeException(jobName + "是非法任务！");
        }
        return jobInfo;
    }

    /**
     * 调用者注册工作，如工作名，任务的处理器等等
     */
    public <R> void registerJob(String jobName, int jobLength,
                                ITaskProcesser<?, ?> taskProcesser, long expireTime) {
        JobInfo<R> jobInfo = new JobInfo<R>(jobName, jobLength, taskProcesser, expireTime);
        // putIfAbsent 如果传入key对应的value已经存在，就返回存在的value，不进行替换。如果不存在，就添加key和value，返回null
        if (jobInfoMap.putIfAbsent(jobName, jobInfo) != null) {
            throw new RuntimeException(jobName + "已经注册！");
        }
    }

    /**
     * 获得工作的整体处理进度
     */
    public <R> String getTaskProgess(String jobName) {
        JobInfo<R> jobInfo = getJob(jobName);
        return jobInfo.getTotalProcess();
    }

    /**
     * 获得每个任务的处理详情
     */
    public <R> List<TaskResult<R>> getTaskDetail(String jobName) {
        JobInfo<R> jobInfo = getJob(jobName);
        return jobInfo.getTaskDetail();
    }
}
